1 /*
2 * Copyright (C) 2006 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.google.common.util.concurrent;
18
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
23 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
24 import static java.lang.Thread.currentThread;
25 import static java.util.Arrays.asList;
26
27 import com.google.common.annotations.Beta;
28 import com.google.common.base.Function;
29 import com.google.common.base.Optional;
30 import com.google.common.base.Preconditions;
31 import com.google.common.collect.ImmutableCollection;
32 import com.google.common.collect.ImmutableList;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Ordering;
35 import com.google.common.collect.Queues;
36 import com.google.common.collect.Sets;
37
38 import java.lang.reflect.Constructor;
39 import java.lang.reflect.InvocationTargetException;
40 import java.lang.reflect.UndeclaredThrowableException;
41 import java.util.Arrays;
42 import java.util.Collections;
43 import java.util.List;
44 import java.util.Set;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.CancellationException;
47 import java.util.concurrent.ConcurrentLinkedQueue;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.Executor;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.RejectedExecutionException;
52 import java.util.concurrent.TimeUnit;
53 import java.util.concurrent.TimeoutException;
54 import java.util.concurrent.atomic.AtomicBoolean;
55 import java.util.concurrent.atomic.AtomicInteger;
56 import java.util.logging.Level;
57 import java.util.logging.Logger;
58
59 import javax.annotation.Nullable;
60
61 /**
62 * Static utility methods pertaining to the {@link Future} interface.
63 *
64 * <p>Many of these methods use the {@link ListenableFuture} API; consult the
65 * Guava User Guide article on <a href=
66 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
67 * {@code ListenableFuture}</a>.
68 *
69 * @author Kevin Bourrillion
70 * @author Nishant Thakkar
71 * @author Sven Mawson
72 * @since 1.0
73 */
74 @Beta
75 public final class Futures {
76 private Futures() {}
77
78 /**
79 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
80 * and a {@link Function} that maps from {@link Exception} instances into the
81 * appropriate checked type.
82 *
83 * <p>The given mapping function will be applied to an
84 * {@link InterruptedException}, a {@link CancellationException}, or an
85 * {@link ExecutionException}.
86 * See {@link Future#get()} for details on the exceptions thrown.
87 *
88 * @since 9.0 (source-compatible since 1.0)
89 */
90 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
91 ListenableFuture<V> future, Function<? super Exception, X> mapper) {
92 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
93 }
94
95 private abstract static class ImmediateFuture<V>
96 implements ListenableFuture<V> {
97
98 private static final Logger log =
99 Logger.getLogger(ImmediateFuture.class.getName());
100
101 @Override
102 public void addListener(Runnable listener, Executor executor) {
103 checkNotNull(listener, "Runnable was null.");
104 checkNotNull(executor, "Executor was null.");
105 try {
106 executor.execute(listener);
107 } catch (RuntimeException e) {
108 // ListenableFuture's contract is that it will not throw unchecked
109 // exceptions, so log the bad runnable and/or executor and swallow it.
110 log.log(Level.SEVERE, "RuntimeException while executing runnable "
111 + listener + " with executor " + executor, e);
112 }
113 }
114
115 @Override
116 public boolean cancel(boolean mayInterruptIfRunning) {
117 return false;
118 }
119
120 @Override
121 public abstract V get() throws ExecutionException;
122
123 @Override
124 public V get(long timeout, TimeUnit unit) throws ExecutionException {
125 checkNotNull(unit);
126 return get();
127 }
128
129 @Override
130 public boolean isCancelled() {
131 return false;
132 }
133
134 @Override
135 public boolean isDone() {
136 return true;
137 }
138 }
139
140 private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
141
142 @Nullable private final V value;
143
144 ImmediateSuccessfulFuture(@Nullable V value) {
145 this.value = value;
146 }
147
148 @Override
149 public V get() {
150 return value;
151 }
152 }
153
154 private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
155 extends ImmediateFuture<V> implements CheckedFuture<V, X> {
156
157 @Nullable private final V value;
158
159 ImmediateSuccessfulCheckedFuture(@Nullable V value) {
160 this.value = value;
161 }
162
163 @Override
164 public V get() {
165 return value;
166 }
167
168 @Override
169 public V checkedGet() {
170 return value;
171 }
172
173 @Override
174 public V checkedGet(long timeout, TimeUnit unit) {
175 checkNotNull(unit);
176 return value;
177 }
178 }
179
180 private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
181
182 private final Throwable thrown;
183
184 ImmediateFailedFuture(Throwable thrown) {
185 this.thrown = thrown;
186 }
187
188 @Override
189 public V get() throws ExecutionException {
190 throw new ExecutionException(thrown);
191 }
192 }
193
194 private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
195
196 private final CancellationException thrown;
197
198 ImmediateCancelledFuture() {
199 this.thrown = new CancellationException("Immediate cancelled future.");
200 }
201
202 @Override
203 public boolean isCancelled() {
204 return true;
205 }
206
207 @Override
208 public V get() {
209 throw AbstractFuture.cancellationExceptionWithCause(
210 "Task was cancelled.", thrown);
211 }
212 }
213
214 private static class ImmediateFailedCheckedFuture<V, X extends Exception>
215 extends ImmediateFuture<V> implements CheckedFuture<V, X> {
216
217 private final X thrown;
218
219 ImmediateFailedCheckedFuture(X thrown) {
220 this.thrown = thrown;
221 }
222
223 @Override
224 public V get() throws ExecutionException {
225 throw new ExecutionException(thrown);
226 }
227
228 @Override
229 public V checkedGet() throws X {
230 throw thrown;
231 }
232
233 @Override
234 public V checkedGet(long timeout, TimeUnit unit) throws X {
235 checkNotNull(unit);
236 throw thrown;
237 }
238 }
239
240 /**
241 * Creates a {@code ListenableFuture} which has its value set immediately upon
242 * construction. The getters just return the value. This {@code Future} can't
243 * be canceled or timed out and its {@code isDone()} method always returns
244 * {@code true}.
245 */
246 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
247 return new ImmediateSuccessfulFuture<V>(value);
248 }
249
250 /**
251 * Returns a {@code CheckedFuture} which has its value set immediately upon
252 * construction.
253 *
254 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
255 * method always returns {@code true}. Calling {@code get()} or {@code
256 * checkedGet()} will immediately return the provided value.
257 */
258 public static <V, X extends Exception> CheckedFuture<V, X>
259 immediateCheckedFuture(@Nullable V value) {
260 return new ImmediateSuccessfulCheckedFuture<V, X>(value);
261 }
262
263 /**
264 * Returns a {@code ListenableFuture} which has an exception set immediately
265 * upon construction.
266 *
267 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
268 * method always returns {@code true}. Calling {@code get()} will immediately
269 * throw the provided {@code Throwable} wrapped in an {@code
270 * ExecutionException}.
271 */
272 public static <V> ListenableFuture<V> immediateFailedFuture(
273 Throwable throwable) {
274 checkNotNull(throwable);
275 return new ImmediateFailedFuture<V>(throwable);
276 }
277
278 /**
279 * Creates a {@code ListenableFuture} which is cancelled immediately upon
280 * construction, so that {@code isCancelled()} always returns {@code true}.
281 *
282 * @since 14.0
283 */
284 public static <V> ListenableFuture<V> immediateCancelledFuture() {
285 return new ImmediateCancelledFuture<V>();
286 }
287
288 /**
289 * Returns a {@code CheckedFuture} which has an exception set immediately upon
290 * construction.
291 *
292 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
293 * method always returns {@code true}. Calling {@code get()} will immediately
294 * throw the provided {@code Exception} wrapped in an {@code
295 * ExecutionException}, and calling {@code checkedGet()} will throw the
296 * provided exception itself.
297 */
298 public static <V, X extends Exception> CheckedFuture<V, X>
299 immediateFailedCheckedFuture(X exception) {
300 checkNotNull(exception);
301 return new ImmediateFailedCheckedFuture<V, X>(exception);
302 }
303
304 /**
305 * Returns a {@code Future} whose result is taken from the given primary
306 * {@code input} or, if the primary input fails, from the {@code Future}
307 * provided by the {@code fallback}. {@link FutureFallback#create} is not
308 * invoked until the primary input has failed, so if the primary input
309 * succeeds, it is never invoked. If, during the invocation of {@code
310 * fallback}, an exception is thrown, this exception is used as the result of
311 * the output {@code Future}.
312 *
313 * <p>Below is an example of a fallback that returns a default value if an
314 * exception occurs:
315 *
316 * <pre> {@code
317 * ListenableFuture<Integer> fetchCounterFuture = ...;
318 *
319 * // Falling back to a zero counter in case an exception happens when
320 * // processing the RPC to fetch counters.
321 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
322 * fetchCounterFuture, new FutureFallback<Integer>() {
323 * public ListenableFuture<Integer> create(Throwable t) {
324 * // Returning "0" as the default for the counter when the
325 * // exception happens.
326 * return immediateFuture(0);
327 * }
328 * });}</pre>
329 *
330 * <p>The fallback can also choose to propagate the original exception when
331 * desired:
332 *
333 * <pre> {@code
334 * ListenableFuture<Integer> fetchCounterFuture = ...;
335 *
336 * // Falling back to a zero counter only in case the exception was a
337 * // TimeoutException.
338 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
339 * fetchCounterFuture, new FutureFallback<Integer>() {
340 * public ListenableFuture<Integer> create(Throwable t) {
341 * if (t instanceof TimeoutException) {
342 * return immediateFuture(0);
343 * }
344 * return immediateFailedFuture(t);
345 * }
346 * });}</pre>
347 *
348 * <p>Note: If the derived {@code Future} is slow or heavyweight to create
349 * (whether the {@code Future} itself is slow or heavyweight to complete is
350 * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
351 * FutureFallback, Executor) supplying an executor}. If you do not supply an
352 * executor, {@code withFallback} will use a
353 * {@linkplain MoreExecutors#directExecutor direct executor}, which carries
354 * some caveats for heavier operations. For example, the call to {@code
355 * fallback.create} may run on an unpredictable or undesirable thread:
356 *
357 * <ul>
358 * <li>If the input {@code Future} is done at the time {@code withFallback}
359 * is called, {@code withFallback} will call {@code fallback.create} inline.
360 * <li>If the input {@code Future} is not yet done, {@code withFallback} will
361 * schedule {@code fallback.create} to be run by the thread that completes
362 * the input {@code Future}, which may be an internal system thread such as
363 * an RPC network thread.
364 * </ul>
365 *
366 * <p>Also note that, regardless of which thread executes the {@code
367 * fallback.create}, all other registered but unexecuted listeners are
368 * prevented from running during its execution, even if those listeners are
369 * to run in other executors.
370 *
371 * @param input the primary input {@code Future}
372 * @param fallback the {@link FutureFallback} implementation to be called if
373 * {@code input} fails
374 * @since 14.0
375 */
376 public static <V> ListenableFuture<V> withFallback(
377 ListenableFuture<? extends V> input,
378 FutureFallback<? extends V> fallback) {
379 return withFallback(input, fallback, directExecutor());
380 }
381
382 /**
383 * Returns a {@code Future} whose result is taken from the given primary
384 * {@code input} or, if the primary input fails, from the {@code Future}
385 * provided by the {@code fallback}. {@link FutureFallback#create} is not
386 * invoked until the primary input has failed, so if the primary input
387 * succeeds, it is never invoked. If, during the invocation of {@code
388 * fallback}, an exception is thrown, this exception is used as the result of
389 * the output {@code Future}.
390 *
391 * <p>Below is an example of a fallback that returns a default value if an
392 * exception occurs:
393 *
394 * <pre> {@code
395 * ListenableFuture<Integer> fetchCounterFuture = ...;
396 *
397 * // Falling back to a zero counter in case an exception happens when
398 * // processing the RPC to fetch counters.
399 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
400 * fetchCounterFuture, new FutureFallback<Integer>() {
401 * public ListenableFuture<Integer> create(Throwable t) {
402 * // Returning "0" as the default for the counter when the
403 * // exception happens.
404 * return immediateFuture(0);
405 * }
406 * }, directExecutor());}</pre>
407 *
408 * <p>The fallback can also choose to propagate the original exception when
409 * desired:
410 *
411 * <pre> {@code
412 * ListenableFuture<Integer> fetchCounterFuture = ...;
413 *
414 * // Falling back to a zero counter only in case the exception was a
415 * // TimeoutException.
416 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
417 * fetchCounterFuture, new FutureFallback<Integer>() {
418 * public ListenableFuture<Integer> create(Throwable t) {
419 * if (t instanceof TimeoutException) {
420 * return immediateFuture(0);
421 * }
422 * return immediateFailedFuture(t);
423 * }
424 * }, directExecutor());}</pre>
425 *
426 * <p>When the execution of {@code fallback.create} is fast and lightweight
427 * (though the {@code Future} it returns need not meet these criteria),
428 * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
429 * omitting the executor} or explicitly specifying {@code
430 * directExecutor}. However, be aware of the caveats documented in the
431 * link above.
432 *
433 * @param input the primary input {@code Future}
434 * @param fallback the {@link FutureFallback} implementation to be called if
435 * {@code input} fails
436 * @param executor the executor that runs {@code fallback} if {@code input}
437 * fails
438 * @since 14.0
439 */
440 public static <V> ListenableFuture<V> withFallback(
441 ListenableFuture<? extends V> input,
442 FutureFallback<? extends V> fallback, Executor executor) {
443 checkNotNull(fallback);
444 return new FallbackFuture<V>(input, fallback, executor);
445 }
446
447 /**
448 * A future that falls back on a second, generated future, in case its
449 * original future fails.
450 */
451 private static class FallbackFuture<V> extends AbstractFuture<V> {
452
453 private volatile ListenableFuture<? extends V> running;
454
455 FallbackFuture(ListenableFuture<? extends V> input,
456 final FutureFallback<? extends V> fallback,
457 final Executor executor) {
458 running = input;
459 addCallback(running, new FutureCallback<V>() {
460 @Override
461 public void onSuccess(V value) {
462 set(value);
463 }
464
465 @Override
466 public void onFailure(Throwable t) {
467 if (isCancelled()) {
468 return;
469 }
470 try {
471 running = fallback.create(t);
472 if (isCancelled()) { // in case cancel called in the meantime
473 running.cancel(wasInterrupted());
474 return;
475 }
476 addCallback(running, new FutureCallback<V>() {
477 @Override
478 public void onSuccess(V value) {
479 set(value);
480 }
481
482 @Override
483 public void onFailure(Throwable t) {
484 if (running.isCancelled()) {
485 cancel(false);
486 } else {
487 setException(t);
488 }
489 }
490 }, directExecutor());
491 } catch (Throwable e) {
492 setException(e);
493 }
494 }
495 }, executor);
496 }
497
498 @Override
499 public boolean cancel(boolean mayInterruptIfRunning) {
500 if (super.cancel(mayInterruptIfRunning)) {
501 running.cancel(mayInterruptIfRunning);
502 return true;
503 }
504 return false;
505 }
506 }
507
508 /**
509 * Returns a new {@code ListenableFuture} whose result is asynchronously
510 * derived from the result of the given {@code Future}. More precisely, the
511 * returned {@code Future} takes its result from a {@code Future} produced by
512 * applying the given {@code AsyncFunction} to the result of the original
513 * {@code Future}. Example:
514 *
515 * <pre> {@code
516 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
517 * AsyncFunction<RowKey, QueryResult> queryFunction =
518 * new AsyncFunction<RowKey, QueryResult>() {
519 * public ListenableFuture<QueryResult> apply(RowKey rowKey) {
520 * return dataService.read(rowKey);
521 * }
522 * };
523 * ListenableFuture<QueryResult> queryFuture =
524 * transform(rowKeyFuture, queryFunction);}</pre>
525 *
526 * <p>Note: If the derived {@code Future} is slow or heavyweight to create
527 * (whether the {@code Future} itself is slow or heavyweight to complete is
528 * irrelevant), consider {@linkplain #transform(ListenableFuture,
529 * AsyncFunction, Executor) supplying an executor}. If you do not supply an
530 * executor, {@code transform} will use a
531 * {@linkplain MoreExecutors#directExecutor direct executor}, which carries
532 * some caveats for heavier operations. For example, the call to {@code
533 * function.apply} may run on an unpredictable or undesirable thread:
534 *
535 * <ul>
536 * <li>If the input {@code Future} is done at the time {@code transform} is
537 * called, {@code transform} will call {@code function.apply} inline.
538 * <li>If the input {@code Future} is not yet done, {@code transform} will
539 * schedule {@code function.apply} to be run by the thread that completes the
540 * input {@code Future}, which may be an internal system thread such as an
541 * RPC network thread.
542 * </ul>
543 *
544 * <p>Also note that, regardless of which thread executes the {@code
545 * function.apply}, all other registered but unexecuted listeners are
546 * prevented from running during its execution, even if those listeners are
547 * to run in other executors.
548 *
549 * <p>The returned {@code Future} attempts to keep its cancellation state in
550 * sync with that of the input future and that of the future returned by the
551 * function. That is, if the returned {@code Future} is cancelled, it will
552 * attempt to cancel the other two, and if either of the other two is
553 * cancelled, the returned {@code Future} will receive a callback in which it
554 * will attempt to cancel itself.
555 *
556 * @param input The future to transform
557 * @param function A function to transform the result of the input future
558 * to the result of the output future
559 * @return A future that holds result of the function (if the input succeeded)
560 * or the original input's failure (if not)
561 * @since 11.0
562 */
563 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
564 AsyncFunction<? super I, ? extends O> function) {
565 ChainingListenableFuture<I, O> output =
566 new ChainingListenableFuture<I, O>(function, input);
567 input.addListener(output, directExecutor());
568 return output;
569 }
570
571 /**
572 * Returns a new {@code ListenableFuture} whose result is asynchronously
573 * derived from the result of the given {@code Future}. More precisely, the
574 * returned {@code Future} takes its result from a {@code Future} produced by
575 * applying the given {@code AsyncFunction} to the result of the original
576 * {@code Future}. Example:
577 *
578 * <pre> {@code
579 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
580 * AsyncFunction<RowKey, QueryResult> queryFunction =
581 * new AsyncFunction<RowKey, QueryResult>() {
582 * public ListenableFuture<QueryResult> apply(RowKey rowKey) {
583 * return dataService.read(rowKey);
584 * }
585 * };
586 * ListenableFuture<QueryResult> queryFuture =
587 * transform(rowKeyFuture, queryFunction, executor);}</pre>
588 *
589 * <p>The returned {@code Future} attempts to keep its cancellation state in
590 * sync with that of the input future and that of the future returned by the
591 * chain function. That is, if the returned {@code Future} is cancelled, it
592 * will attempt to cancel the other two, and if either of the other two is
593 * cancelled, the returned {@code Future} will receive a callback in which it
594 * will attempt to cancel itself.
595 *
596 * <p>When the execution of {@code function.apply} is fast and lightweight
597 * (though the {@code Future} it returns need not meet these criteria),
598 * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
599 * the executor} or explicitly specifying {@code directExecutor}.
600 * However, be aware of the caveats documented in the link above.
601 *
602 * @param input The future to transform
603 * @param function A function to transform the result of the input future
604 * to the result of the output future
605 * @param executor Executor to run the function in.
606 * @return A future that holds result of the function (if the input succeeded)
607 * or the original input's failure (if not)
608 * @since 11.0
609 */
610 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
611 AsyncFunction<? super I, ? extends O> function,
612 Executor executor) {
613 checkNotNull(executor);
614 ChainingListenableFuture<I, O> output =
615 new ChainingListenableFuture<I, O>(function, input);
616 input.addListener(rejectionPropagatingRunnable(output, output, executor), directExecutor());
617 return output;
618 }
619
620 /**
621 * Returns a Runnable that will invoke the delegate Runnable on the delegate executor, but if the
622 * task is rejected, it will propagate that rejection to the output future.
623 */
624 private static Runnable rejectionPropagatingRunnable(
625 final AbstractFuture<?> outputFuture,
626 final Runnable delegateTask,
627 final Executor delegateExecutor) {
628 return new Runnable() {
629 @Override public void run() {
630 final AtomicBoolean thrownFromDelegate = new AtomicBoolean(true);
631 try {
632 delegateExecutor.execute(new Runnable() {
633 @Override public void run() {
634 thrownFromDelegate.set(false);
635 delegateTask.run();
636 }
637 });
638 } catch (RejectedExecutionException e) {
639 if (thrownFromDelegate.get()) {
640 // wrap exception?
641 outputFuture.setException(e);
642 }
643 // otherwise it must have been thrown from a transitive call and the delegate runnable
644 // should have handled it.
645 }
646 }
647 };
648 }
649
650 /**
651 * Returns a new {@code ListenableFuture} whose result is the product of
652 * applying the given {@code Function} to the result of the given {@code
653 * Future}. Example:
654 *
655 * <pre> {@code
656 * ListenableFuture<QueryResult> queryFuture = ...;
657 * Function<QueryResult, List<Row>> rowsFunction =
658 * new Function<QueryResult, List<Row>>() {
659 * public List<Row> apply(QueryResult queryResult) {
660 * return queryResult.getRows();
661 * }
662 * };
663 * ListenableFuture<List<Row>> rowsFuture =
664 * transform(queryFuture, rowsFunction);}</pre>
665 *
666 * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain
667 * #transform(ListenableFuture, Function, Executor) supplying an executor}.
668 * If you do not supply an executor, {@code transform} will use an inline
669 * executor, which carries some caveats for heavier operations. For example,
670 * the call to {@code function.apply} may run on an unpredictable or
671 * undesirable thread:
672 *
673 * <ul>
674 * <li>If the input {@code Future} is done at the time {@code transform} is
675 * called, {@code transform} will call {@code function.apply} inline.
676 * <li>If the input {@code Future} is not yet done, {@code transform} will
677 * schedule {@code function.apply} to be run by the thread that completes the
678 * input {@code Future}, which may be an internal system thread such as an
679 * RPC network thread.
680 * </ul>
681 *
682 * <p>Also note that, regardless of which thread executes the {@code
683 * function.apply}, all other registered but unexecuted listeners are
684 * prevented from running during its execution, even if those listeners are
685 * to run in other executors.
686 *
687 * <p>The returned {@code Future} attempts to keep its cancellation state in
688 * sync with that of the input future. That is, if the returned {@code Future}
689 * is cancelled, it will attempt to cancel the input, and if the input is
690 * cancelled, the returned {@code Future} will receive a callback in which it
691 * will attempt to cancel itself.
692 *
693 * <p>An example use of this method is to convert a serializable object
694 * returned from an RPC into a POJO.
695 *
696 * @param input The future to transform
697 * @param function A Function to transform the results of the provided future
698 * to the results of the returned future. This will be run in the thread
699 * that notifies input it is complete.
700 * @return A future that holds result of the transformation.
701 * @since 9.0 (in 1.0 as {@code compose})
702 */
703 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
704 final Function<? super I, ? extends O> function) {
705 checkNotNull(function);
706 ChainingListenableFuture<I, O> output =
707 new ChainingListenableFuture<I, O>(asAsyncFunction(function), input);
708 input.addListener(output, directExecutor());
709 return output;
710 }
711
712 /**
713 * Returns a new {@code ListenableFuture} whose result is the product of
714 * applying the given {@code Function} to the result of the given {@code
715 * Future}. Example:
716 *
717 * <pre> {@code
718 * ListenableFuture<QueryResult> queryFuture = ...;
719 * Function<QueryResult, List<Row>> rowsFunction =
720 * new Function<QueryResult, List<Row>>() {
721 * public List<Row> apply(QueryResult queryResult) {
722 * return queryResult.getRows();
723 * }
724 * };
725 * ListenableFuture<List<Row>> rowsFuture =
726 * transform(queryFuture, rowsFunction, executor);}</pre>
727 *
728 * <p>The returned {@code Future} attempts to keep its cancellation state in
729 * sync with that of the input future. That is, if the returned {@code Future}
730 * is cancelled, it will attempt to cancel the input, and if the input is
731 * cancelled, the returned {@code Future} will receive a callback in which it
732 * will attempt to cancel itself.
733 *
734 * <p>An example use of this method is to convert a serializable object
735 * returned from an RPC into a POJO.
736 *
737 * <p>When the transformation is fast and lightweight, consider {@linkplain
738 * #transform(ListenableFuture, Function) omitting the executor} or
739 * explicitly specifying {@code directExecutor}. However, be aware of the
740 * caveats documented in the link above.
741 *
742 * @param input The future to transform
743 * @param function A Function to transform the results of the provided future
744 * to the results of the returned future.
745 * @param executor Executor to run the function in.
746 * @return A future that holds result of the transformation.
747 * @since 9.0 (in 2.0 as {@code compose})
748 */
749 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
750 final Function<? super I, ? extends O> function, Executor executor) {
751 checkNotNull(function);
752 return transform(input, asAsyncFunction(function), executor);
753 }
754
755 /** Wraps the given function as an AsyncFunction. */
756 private static <I, O> AsyncFunction<I, O> asAsyncFunction(
757 final Function<? super I, ? extends O> function) {
758 return new AsyncFunction<I, O>() {
759 @Override public ListenableFuture<O> apply(I input) {
760 O output = function.apply(input);
761 return immediateFuture(output);
762 }
763 };
764 }
765
766 /**
767 * Like {@link #transform(ListenableFuture, Function)} except that the
768 * transformation {@code function} is invoked on each call to
769 * {@link Future#get() get()} on the returned future.
770 *
771 * <p>The returned {@code Future} reflects the input's cancellation
772 * state directly, and any attempt to cancel the returned Future is likewise
773 * passed through to the input Future.
774 *
775 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
776 * only apply the timeout to the execution of the underlying {@code Future},
777 * <em>not</em> to the execution of the transformation function.
778 *
779 * <p>The primary audience of this method is callers of {@code transform}
780 * who don't have a {@code ListenableFuture} available and
781 * do not mind repeated, lazy function evaluation.
782 *
783 * @param input The future to transform
784 * @param function A Function to transform the results of the provided future
785 * to the results of the returned future.
786 * @return A future that returns the result of the transformation.
787 * @since 10.0
788 */
789 public static <I, O> Future<O> lazyTransform(final Future<I> input,
790 final Function<? super I, ? extends O> function) {
791 checkNotNull(input);
792 checkNotNull(function);
793 return new Future<O>() {
794
795 @Override
796 public boolean cancel(boolean mayInterruptIfRunning) {
797 return input.cancel(mayInterruptIfRunning);
798 }
799
800 @Override
801 public boolean isCancelled() {
802 return input.isCancelled();
803 }
804
805 @Override
806 public boolean isDone() {
807 return input.isDone();
808 }
809
810 @Override
811 public O get() throws InterruptedException, ExecutionException {
812 return applyTransformation(input.get());
813 }
814
815 @Override
816 public O get(long timeout, TimeUnit unit)
817 throws InterruptedException, ExecutionException, TimeoutException {
818 return applyTransformation(input.get(timeout, unit));
819 }
820
821 private O applyTransformation(I input) throws ExecutionException {
822 try {
823 return function.apply(input);
824 } catch (Throwable t) {
825 throw new ExecutionException(t);
826 }
827 }
828 };
829 }
830
831 /**
832 * An implementation of {@code ListenableFuture} that also implements
833 * {@code Runnable} so that it can be used to nest ListenableFutures.
834 * Once the passed-in {@code ListenableFuture} is complete, it calls the
835 * passed-in {@code Function} to generate the result.
836 *
837 * <p>For historical reasons, this class has a special case in its exception
838 * handling: If the given {@code AsyncFunction} throws an {@code
839 * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it
840 * and uses its <i>cause</i> as the output future's exception, rather than
841 * using the {@code UndeclaredThrowableException} itself as it would for other
842 * exception types. The reason for this is that {@code Futures.transform} used
843 * to require a {@code Function}, whose {@code apply} method is not allowed to
844 * throw checked exceptions. Nowadays, {@code Futures.transform} has an
845 * overload that accepts an {@code AsyncFunction}, whose {@code apply} method
846 * <i>is</i> allowed to throw checked exception. Users who wish to throw
847 * checked exceptions should use that overload instead, and <a
848 * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we
849 * should remove the {@code UndeclaredThrowableException} special case</a>.
850 */
851 private static class ChainingListenableFuture<I, O>
852 extends AbstractFuture<O> implements Runnable {
853
854 private AsyncFunction<? super I, ? extends O> function;
855 private ListenableFuture<? extends I> inputFuture;
856 private volatile ListenableFuture<? extends O> outputFuture;
857
858 private ChainingListenableFuture(
859 AsyncFunction<? super I, ? extends O> function,
860 ListenableFuture<? extends I> inputFuture) {
861 this.function = checkNotNull(function);
862 this.inputFuture = checkNotNull(inputFuture);
863 }
864
865 @Override
866 public boolean cancel(boolean mayInterruptIfRunning) {
867 /*
868 * Our additional cancellation work needs to occur even if
869 * !mayInterruptIfRunning, so we can't move it into interruptTask().
870 */
871 if (super.cancel(mayInterruptIfRunning)) {
872 // This should never block since only one thread is allowed to cancel
873 // this Future.
874 cancel(inputFuture, mayInterruptIfRunning);
875 cancel(outputFuture, mayInterruptIfRunning);
876 return true;
877 }
878 return false;
879 }
880
881 private void cancel(@Nullable Future<?> future,
882 boolean mayInterruptIfRunning) {
883 if (future != null) {
884 future.cancel(mayInterruptIfRunning);
885 }
886 }
887
888 @Override
889 public void run() {
890 try {
891 I sourceResult;
892 try {
893 sourceResult = getUninterruptibly(inputFuture);
894 } catch (CancellationException e) {
895 // Cancel this future and return.
896 // At this point, inputFuture is cancelled and outputFuture doesn't
897 // exist, so the value of mayInterruptIfRunning is irrelevant.
898 cancel(false);
899 return;
900 } catch (ExecutionException e) {
901 // Set the cause of the exception as this future's exception
902 setException(e.getCause());
903 return;
904 }
905
906 final ListenableFuture<? extends O> outputFuture = this.outputFuture =
907 Preconditions.checkNotNull(function.apply(sourceResult),
908 "AsyncFunction may not return null.");
909 if (isCancelled()) {
910 outputFuture.cancel(wasInterrupted());
911 this.outputFuture = null;
912 return;
913 }
914 outputFuture.addListener(new Runnable() {
915 @Override
916 public void run() {
917 try {
918 set(getUninterruptibly(outputFuture));
919 } catch (CancellationException e) {
920 // Cancel this future and return.
921 // At this point, inputFuture and outputFuture are done, so the
922 // value of mayInterruptIfRunning is irrelevant.
923 cancel(false);
924 return;
925 } catch (ExecutionException e) {
926 // Set the cause of the exception as this future's exception
927 setException(e.getCause());
928 } finally {
929 // Don't pin inputs beyond completion
930 ChainingListenableFuture.this.outputFuture = null;
931 }
932 }
933 }, directExecutor());
934 } catch (UndeclaredThrowableException e) {
935 // Set the cause of the exception as this future's exception
936 setException(e.getCause());
937 } catch (Throwable t) {
938 // This exception is irrelevant in this thread, but useful for the
939 // client
940 setException(t);
941 } finally {
942 // Don't pin inputs beyond completion
943 function = null;
944 inputFuture = null;
945 }
946 }
947 }
948
949 /**
950 * Returns a new {@code ListenableFuture} whose result is the product of
951 * calling {@code get()} on the {@code Future} nested within the given {@code
952 * Future}, effectively chaining the futures one after the other. Example:
953 *
954 * <pre> {@code
955 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
956 * ListenableFuture<String> dereferenced = dereference(nested);}</pre>
957 *
958 * <p>This call has the same cancellation and execution semantics as {@link
959 * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
960 * Future} attempts to keep its cancellation state in sync with both the
961 * input {@code Future} and the nested {@code Future}. The transformation
962 * is very lightweight and therefore takes place in the same thread (either
963 * the thread that called {@code dereference}, or the thread in which the
964 * dereferenced future completes).
965 *
966 * @param nested The nested future to transform.
967 * @return A future that holds result of the inner future.
968 * @since 13.0
969 */
970 @SuppressWarnings({"rawtypes", "unchecked"})
971 public static <V> ListenableFuture<V> dereference(
972 ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
973 return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
974 }
975
976 /**
977 * Helper {@code Function} for {@link #dereference}.
978 */
979 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
980 new AsyncFunction<ListenableFuture<Object>, Object>() {
981 @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
982 return input;
983 }
984 };
985
986 /**
987 * Creates a new {@code ListenableFuture} whose value is a list containing the
988 * values of all its input futures, if all succeed. If any input fails, the
989 * returned future fails immediately.
990 *
991 * <p>The list of results is in the same order as the input list.
992 *
993 * <p>Canceling this future will attempt to cancel all the component futures,
994 * and if any of the provided futures fails or is canceled, this one is,
995 * too.
996 *
997 * @param futures futures to combine
998 * @return a future that provides a list of the results of the component
999 * futures
1000 * @since 10.0
1001 */
1002 @Beta
1003 public static <V> ListenableFuture<List<V>> allAsList(
1004 ListenableFuture<? extends V>... futures) {
1005 return listFuture(ImmutableList.copyOf(futures), true, directExecutor());
1006 }
1007
1008 /**
1009 * Creates a new {@code ListenableFuture} whose value is a list containing the
1010 * values of all its input futures, if all succeed. If any input fails, the
1011 * returned future fails immediately.
1012 *
1013 * <p>The list of results is in the same order as the input list.
1014 *
1015 * <p>Canceling this future will attempt to cancel all the component futures,
1016 * and if any of the provided futures fails or is canceled, this one is,
1017 * too.
1018 *
1019 * @param futures futures to combine
1020 * @return a future that provides a list of the results of the component
1021 * futures
1022 * @since 10.0
1023 */
1024 @Beta
1025 public static <V> ListenableFuture<List<V>> allAsList(
1026 Iterable<? extends ListenableFuture<? extends V>> futures) {
1027 return listFuture(ImmutableList.copyOf(futures), true, directExecutor());
1028 }
1029
1030 private static final class WrappedCombiner<T> implements Callable<T> {
1031 final Callable<T> delegate;
1032 CombinerFuture<T> outputFuture;
1033
1034 WrappedCombiner(Callable<T> delegate) {
1035 this.delegate = checkNotNull(delegate);
1036 }
1037
1038 @Override public T call() throws Exception {
1039 try {
1040 return delegate.call();
1041 } catch (ExecutionException e) {
1042 outputFuture.setException(e.getCause());
1043 } catch (CancellationException e) {
1044 outputFuture.cancel(false);
1045 }
1046 // at this point the return value doesn't matter since we already called setException or
1047 // cancel so the future is done.
1048 return null;
1049 }
1050 }
1051
1052 private static final class CombinerFuture<V> extends ListenableFutureTask<V> {
1053 ImmutableList<ListenableFuture<?>> futures;
1054
1055 CombinerFuture(Callable<V> callable, ImmutableList<ListenableFuture<?>> futures) {
1056 super(callable);
1057 this.futures = futures;
1058 }
1059
1060 @Override public boolean cancel(boolean mayInterruptIfRunning) {
1061 ImmutableList<ListenableFuture<?>> futures = this.futures;
1062 if (super.cancel(mayInterruptIfRunning)) {
1063 for (ListenableFuture<?> future : futures) {
1064 future.cancel(mayInterruptIfRunning);
1065 }
1066 return true;
1067 }
1068 return false;
1069 }
1070
1071 @Override protected void done() {
1072 super.done();
1073 futures = null;
1074 }
1075
1076 @Override protected void setException(Throwable t) {
1077 super.setException(t);
1078 }
1079 }
1080
1081 /**
1082 * Creates a new {@code ListenableFuture} whose result is set from the
1083 * supplied future when it completes. Cancelling the supplied future
1084 * will also cancel the returned future, but cancelling the returned
1085 * future will have no effect on the supplied future.
1086 *
1087 * @since 15.0
1088 */
1089 public static <V> ListenableFuture<V> nonCancellationPropagating(
1090 ListenableFuture<V> future) {
1091 return new NonCancellationPropagatingFuture<V>(future);
1092 }
1093
1094 /**
1095 * A wrapped future that does not propagate cancellation to its delegate.
1096 */
1097 private static class NonCancellationPropagatingFuture<V>
1098 extends AbstractFuture<V> {
1099 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) {
1100 checkNotNull(delegate);
1101 addCallback(delegate, new FutureCallback<V>() {
1102 @Override
1103 public void onSuccess(V result) {
1104 set(result);
1105 }
1106
1107 @Override
1108 public void onFailure(Throwable t) {
1109 if (delegate.isCancelled()) {
1110 cancel(false);
1111 } else {
1112 setException(t);
1113 }
1114 }
1115 }, directExecutor());
1116 }
1117 }
1118
1119 /**
1120 * Creates a new {@code ListenableFuture} whose value is a list containing the
1121 * values of all its successful input futures. The list of results is in the
1122 * same order as the input list, and if any of the provided futures fails or
1123 * is canceled, its corresponding position will contain {@code null} (which is
1124 * indistinguishable from the future having a successful value of
1125 * {@code null}).
1126 *
1127 * <p>Canceling this future will attempt to cancel all the component futures.
1128 *
1129 * @param futures futures to combine
1130 * @return a future that provides a list of the results of the component
1131 * futures
1132 * @since 10.0
1133 */
1134 @Beta
1135 public static <V> ListenableFuture<List<V>> successfulAsList(
1136 ListenableFuture<? extends V>... futures) {
1137 return listFuture(ImmutableList.copyOf(futures), false, directExecutor());
1138 }
1139
1140 /**
1141 * Creates a new {@code ListenableFuture} whose value is a list containing the
1142 * values of all its successful input futures. The list of results is in the
1143 * same order as the input list, and if any of the provided futures fails or
1144 * is canceled, its corresponding position will contain {@code null} (which is
1145 * indistinguishable from the future having a successful value of
1146 * {@code null}).
1147 *
1148 * <p>Canceling this future will attempt to cancel all the component futures.
1149 *
1150 * @param futures futures to combine
1151 * @return a future that provides a list of the results of the component
1152 * futures
1153 * @since 10.0
1154 */
1155 @Beta
1156 public static <V> ListenableFuture<List<V>> successfulAsList(
1157 Iterable<? extends ListenableFuture<? extends V>> futures) {
1158 return listFuture(ImmutableList.copyOf(futures), false, directExecutor());
1159 }
1160
1161 /**
1162 * Returns a list of delegate futures that correspond to the futures received in the order
1163 * that they complete. Delegate futures return the same value or throw the same exception
1164 * as the corresponding input future returns/throws.
1165 *
1166 * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
1167 * does not correspond to a specific input future until the appropriate number of input
1168 * futures have completed. At that point, it is too late to cancel the input future.
1169 * The input future's result, which cannot be stored into the cancelled delegate future,
1170 * is ignored.
1171 *
1172 * @since 17.0
1173 */
1174 @Beta
1175 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(
1176 Iterable<? extends ListenableFuture<? extends T>> futures) {
1177 // A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an
1178 // ArrayDeque
1179 final ConcurrentLinkedQueue<AsyncSettableFuture<T>> delegates =
1180 Queues.newConcurrentLinkedQueue();
1181 ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
1182 // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
1183 // atomically and therefore that each returned future is guaranteed to be in completion order.
1184 // N.B. there are some cases where the use of this executor could have possibly surprising
1185 // effects when input futures finish at approximately the same time _and_ the output futures
1186 // have directExecutor listeners. In this situation, the listeners may end up running on a
1187 // different thread than if they were attached to the corresponding input future. We believe
1188 // this to be a negligible cost since:
1189 // 1. Using the directExecutor implies that your callback is safe to run on any thread.
1190 // 2. This would likely only be noticeable if you were doing something expensive or blocking on
1191 // a directExecutor listener on one of the output futures which is an antipattern anyway.
1192 SerializingExecutor executor = new SerializingExecutor(directExecutor());
1193 for (final ListenableFuture<? extends T> future : futures) {
1194 AsyncSettableFuture<T> delegate = AsyncSettableFuture.create();
1195 // Must make sure to add the delegate to the queue first in case the future is already done
1196 delegates.add(delegate);
1197 future.addListener(new Runnable() {
1198 @Override public void run() {
1199 delegates.remove().setFuture(future);
1200 }
1201 }, executor);
1202 listBuilder.add(delegate);
1203 }
1204 return listBuilder.build();
1205 }
1206
1207 /**
1208 * Registers separate success and failure callbacks to be run when the {@code
1209 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1210 * complete} or, if the computation is already complete, immediately.
1211 *
1212 * <p>There is no guaranteed ordering of execution of callbacks, but any
1213 * callback added through this method is guaranteed to be called once the
1214 * computation is complete.
1215 *
1216 * Example: <pre> {@code
1217 * ListenableFuture<QueryResult> future = ...;
1218 * addCallback(future,
1219 * new FutureCallback<QueryResult> {
1220 * public void onSuccess(QueryResult result) {
1221 * storeInCache(result);
1222 * }
1223 * public void onFailure(Throwable t) {
1224 * reportError(t);
1225 * }
1226 * });}</pre>
1227 *
1228 * <p>Note: If the callback is slow or heavyweight, consider {@linkplain
1229 * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
1230 * executor}. If you do not supply an executor, {@code addCallback} will use
1231 * a {@linkplain MoreExecutors#directExecutor direct executor}, which carries
1232 * some caveats for heavier operations. For example, the callback may run on
1233 * an unpredictable or undesirable thread:
1234 *
1235 * <ul>
1236 * <li>If the input {@code Future} is done at the time {@code addCallback} is
1237 * called, {@code addCallback} will execute the callback inline.
1238 * <li>If the input {@code Future} is not yet done, {@code addCallback} will
1239 * schedule the callback to be run by the thread that completes the input
1240 * {@code Future}, which may be an internal system thread such as an RPC
1241 * network thread.
1242 * </ul>
1243 *
1244 * <p>Also note that, regardless of which thread executes the callback, all
1245 * other registered but unexecuted listeners are prevented from running
1246 * during its execution, even if those listeners are to run in other
1247 * executors.
1248 *
1249 * <p>For a more general interface to attach a completion listener to a
1250 * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1251 *
1252 * @param future The future attach the callback to.
1253 * @param callback The callback to invoke when {@code future} is completed.
1254 * @since 10.0
1255 */
1256 public static <V> void addCallback(ListenableFuture<V> future,
1257 FutureCallback<? super V> callback) {
1258 addCallback(future, callback, directExecutor());
1259 }
1260
1261 /**
1262 * Registers separate success and failure callbacks to be run when the {@code
1263 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1264 * complete} or, if the computation is already complete, immediately.
1265 *
1266 * <p>The callback is run in {@code executor}.
1267 * There is no guaranteed ordering of execution of callbacks, but any
1268 * callback added through this method is guaranteed to be called once the
1269 * computation is complete.
1270 *
1271 * Example: <pre> {@code
1272 * ListenableFuture<QueryResult> future = ...;
1273 * Executor e = ...
1274 * addCallback(future,
1275 * new FutureCallback<QueryResult> {
1276 * public void onSuccess(QueryResult result) {
1277 * storeInCache(result);
1278 * }
1279 * public void onFailure(Throwable t) {
1280 * reportError(t);
1281 * }
1282 * }, e);}</pre>
1283 *
1284 * <p>When the callback is fast and lightweight, consider {@linkplain
1285 * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
1286 * explicitly specifying {@code directExecutor}. However, be aware of the
1287 * caveats documented in the link above.
1288 *
1289 * <p>For a more general interface to attach a completion listener to a
1290 * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1291 *
1292 * @param future The future attach the callback to.
1293 * @param callback The callback to invoke when {@code future} is completed.
1294 * @param executor The executor to run {@code callback} when the future
1295 * completes.
1296 * @since 10.0
1297 */
1298 public static <V> void addCallback(final ListenableFuture<V> future,
1299 final FutureCallback<? super V> callback, Executor executor) {
1300 Preconditions.checkNotNull(callback);
1301 Runnable callbackListener = new Runnable() {
1302 @Override
1303 public void run() {
1304 final V value;
1305 try {
1306 // TODO(user): (Before Guava release), validate that this
1307 // is the thing for IE.
1308 value = getUninterruptibly(future);
1309 } catch (ExecutionException e) {
1310 callback.onFailure(e.getCause());
1311 return;
1312 } catch (RuntimeException e) {
1313 callback.onFailure(e);
1314 return;
1315 } catch (Error e) {
1316 callback.onFailure(e);
1317 return;
1318 }
1319 callback.onSuccess(value);
1320 }
1321 };
1322 future.addListener(callbackListener, executor);
1323 }
1324
1325 /**
1326 * Returns the result of {@link Future#get()}, converting most exceptions to a
1327 * new instance of the given checked exception type. This reduces boilerplate
1328 * for a common use of {@code Future} in which it is unnecessary to
1329 * programmatically distinguish between exception types or to extract other
1330 * information from the exception instance.
1331 *
1332 * <p>Exceptions from {@code Future.get} are treated as follows:
1333 * <ul>
1334 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1335 * {@code X} if the cause is a checked exception, an {@link
1336 * UncheckedExecutionException} if the cause is a {@code
1337 * RuntimeException}, or an {@link ExecutionError} if the cause is an
1338 * {@code Error}.
1339 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1340 * restoring the interrupt).
1341 * <li>Any {@link CancellationException} is propagated untouched, as is any
1342 * other {@link RuntimeException} (though {@code get} implementations are
1343 * discouraged from throwing such exceptions).
1344 * </ul>
1345 *
1346 * <p>The overall principle is to continue to treat every checked exception as a
1347 * checked exception, every unchecked exception as an unchecked exception, and
1348 * every error as an error. In addition, the cause of any {@code
1349 * ExecutionException} is wrapped in order to ensure that the new stack trace
1350 * matches that of the current thread.
1351 *
1352 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1353 * public constructor that accepts zero or more arguments, all of type {@code
1354 * String} or {@code Throwable} (preferring constructors with at least one
1355 * {@code String}) and calling the constructor via reflection. If the
1356 * exception did not already have a cause, one is set by calling {@link
1357 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1358 * {@code IllegalArgumentException} is thrown.
1359 *
1360 * @throws X if {@code get} throws any checked exception except for an {@code
1361 * ExecutionException} whose cause is not itself a checked exception
1362 * @throws UncheckedExecutionException if {@code get} throws an {@code
1363 * ExecutionException} with a {@code RuntimeException} as its cause
1364 * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1365 * with an {@code Error} as its cause
1366 * @throws CancellationException if {@code get} throws a {@code
1367 * CancellationException}
1368 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1369 * RuntimeException} or does not have a suitable constructor
1370 * @since 10.0
1371 */
1372 public static <V, X extends Exception> V get(
1373 Future<V> future, Class<X> exceptionClass) throws X {
1374 checkNotNull(future);
1375 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1376 "Futures.get exception type (%s) must not be a RuntimeException",
1377 exceptionClass);
1378 try {
1379 return future.get();
1380 } catch (InterruptedException e) {
1381 currentThread().interrupt();
1382 throw newWithCause(exceptionClass, e);
1383 } catch (ExecutionException e) {
1384 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1385 throw new AssertionError();
1386 }
1387 }
1388
1389 /**
1390 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1391 * exceptions to a new instance of the given checked exception type. This
1392 * reduces boilerplate for a common use of {@code Future} in which it is
1393 * unnecessary to programmatically distinguish between exception types or to
1394 * extract other information from the exception instance.
1395 *
1396 * <p>Exceptions from {@code Future.get} are treated as follows:
1397 * <ul>
1398 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1399 * {@code X} if the cause is a checked exception, an {@link
1400 * UncheckedExecutionException} if the cause is a {@code
1401 * RuntimeException}, or an {@link ExecutionError} if the cause is an
1402 * {@code Error}.
1403 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1404 * restoring the interrupt).
1405 * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1406 * <li>Any {@link CancellationException} is propagated untouched, as is any
1407 * other {@link RuntimeException} (though {@code get} implementations are
1408 * discouraged from throwing such exceptions).
1409 * </ul>
1410 *
1411 * <p>The overall principle is to continue to treat every checked exception as a
1412 * checked exception, every unchecked exception as an unchecked exception, and
1413 * every error as an error. In addition, the cause of any {@code
1414 * ExecutionException} is wrapped in order to ensure that the new stack trace
1415 * matches that of the current thread.
1416 *
1417 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1418 * public constructor that accepts zero or more arguments, all of type {@code
1419 * String} or {@code Throwable} (preferring constructors with at least one
1420 * {@code String}) and calling the constructor via reflection. If the
1421 * exception did not already have a cause, one is set by calling {@link
1422 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1423 * {@code IllegalArgumentException} is thrown.
1424 *
1425 * @throws X if {@code get} throws any checked exception except for an {@code
1426 * ExecutionException} whose cause is not itself a checked exception
1427 * @throws UncheckedExecutionException if {@code get} throws an {@code
1428 * ExecutionException} with a {@code RuntimeException} as its cause
1429 * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1430 * with an {@code Error} as its cause
1431 * @throws CancellationException if {@code get} throws a {@code
1432 * CancellationException}
1433 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1434 * RuntimeException} or does not have a suitable constructor
1435 * @since 10.0
1436 */
1437 public static <V, X extends Exception> V get(
1438 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1439 throws X {
1440 checkNotNull(future);
1441 checkNotNull(unit);
1442 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1443 "Futures.get exception type (%s) must not be a RuntimeException",
1444 exceptionClass);
1445 try {
1446 return future.get(timeout, unit);
1447 } catch (InterruptedException e) {
1448 currentThread().interrupt();
1449 throw newWithCause(exceptionClass, e);
1450 } catch (TimeoutException e) {
1451 throw newWithCause(exceptionClass, e);
1452 } catch (ExecutionException e) {
1453 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1454 throw new AssertionError();
1455 }
1456 }
1457
1458 private static <X extends Exception> void wrapAndThrowExceptionOrError(
1459 Throwable cause, Class<X> exceptionClass) throws X {
1460 if (cause instanceof Error) {
1461 throw new ExecutionError((Error) cause);
1462 }
1463 if (cause instanceof RuntimeException) {
1464 throw new UncheckedExecutionException(cause);
1465 }
1466 throw newWithCause(exceptionClass, cause);
1467 }
1468
1469 /**
1470 * Returns the result of calling {@link Future#get()} uninterruptibly on a
1471 * task known not to throw a checked exception. This makes {@code Future} more
1472 * suitable for lightweight, fast-running tasks that, barring bugs in the
1473 * code, will not fail. This gives it exception-handling behavior similar to
1474 * that of {@code ForkJoinTask.join}.
1475 *
1476 * <p>Exceptions from {@code Future.get} are treated as follows:
1477 * <ul>
1478 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1479 * {@link UncheckedExecutionException} (if the cause is an {@code
1480 * Exception}) or {@link ExecutionError} (if the cause is an {@code
1481 * Error}).
1482 * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1483 * call. The interrupt is restored before {@code getUnchecked} returns.
1484 * <li>Any {@link CancellationException} is propagated untouched. So is any
1485 * other {@link RuntimeException} ({@code get} implementations are
1486 * discouraged from throwing such exceptions).
1487 * </ul>
1488 *
1489 * <p>The overall principle is to eliminate all checked exceptions: to loop to
1490 * avoid {@code InterruptedException}, to pass through {@code
1491 * CancellationException}, and to wrap any exception from the underlying
1492 * computation in an {@code UncheckedExecutionException} or {@code
1493 * ExecutionError}.
1494 *
1495 * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1496 * {@link Uninterruptibles#getUninterruptibly(Future)}.
1497 *
1498 * @throws UncheckedExecutionException if {@code get} throws an {@code
1499 * ExecutionException} with an {@code Exception} as its cause
1500 * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1501 * with an {@code Error} as its cause
1502 * @throws CancellationException if {@code get} throws a {@code
1503 * CancellationException}
1504 * @since 10.0
1505 */
1506 public static <V> V getUnchecked(Future<V> future) {
1507 checkNotNull(future);
1508 try {
1509 return getUninterruptibly(future);
1510 } catch (ExecutionException e) {
1511 wrapAndThrowUnchecked(e.getCause());
1512 throw new AssertionError();
1513 }
1514 }
1515
1516 private static void wrapAndThrowUnchecked(Throwable cause) {
1517 if (cause instanceof Error) {
1518 throw new ExecutionError((Error) cause);
1519 }
1520 /*
1521 * It's a non-Error, non-Exception Throwable. From my survey of such
1522 * classes, I believe that most users intended to extend Exception, so we'll
1523 * treat it like an Exception.
1524 */
1525 throw new UncheckedExecutionException(cause);
1526 }
1527
1528 /*
1529 * TODO(user): FutureChecker interface for these to be static methods on? If
1530 * so, refer to it in the (static-method) Futures.get documentation
1531 */
1532
1533 /*
1534 * Arguably we don't need a timed getUnchecked because any operation slow
1535 * enough to require a timeout is heavyweight enough to throw a checked
1536 * exception and therefore be inappropriate to use with getUnchecked. Further,
1537 * it's not clear that converting the checked TimeoutException to a
1538 * RuntimeException -- especially to an UncheckedExecutionException, since it
1539 * wasn't thrown by the computation -- makes sense, and if we don't convert
1540 * it, the user still has to write a try-catch block.
1541 *
1542 * If you think you would use this method, let us know.
1543 */
1544
1545 private static <X extends Exception> X newWithCause(
1546 Class<X> exceptionClass, Throwable cause) {
1547 // getConstructors() guarantees this as long as we don't modify the array.
1548 @SuppressWarnings("unchecked")
1549 List<Constructor<X>> constructors =
1550 (List) Arrays.asList(exceptionClass.getConstructors());
1551 for (Constructor<X> constructor : preferringStrings(constructors)) {
1552 @Nullable X instance = newFromConstructor(constructor, cause);
1553 if (instance != null) {
1554 if (instance.getCause() == null) {
1555 instance.initCause(cause);
1556 }
1557 return instance;
1558 }
1559 }
1560 throw new IllegalArgumentException(
1561 "No appropriate constructor for exception of type " + exceptionClass
1562 + " in response to chained exception", cause);
1563 }
1564
1565 private static <X extends Exception> List<Constructor<X>>
1566 preferringStrings(List<Constructor<X>> constructors) {
1567 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1568 }
1569
1570 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1571 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1572 @Override public Boolean apply(Constructor<?> input) {
1573 return asList(input.getParameterTypes()).contains(String.class);
1574 }
1575 }).reverse();
1576
1577 @Nullable private static <X> X newFromConstructor(
1578 Constructor<X> constructor, Throwable cause) {
1579 Class<?>[] paramTypes = constructor.getParameterTypes();
1580 Object[] params = new Object[paramTypes.length];
1581 for (int i = 0; i < paramTypes.length; i++) {
1582 Class<?> paramType = paramTypes[i];
1583 if (paramType.equals(String.class)) {
1584 params[i] = cause.toString();
1585 } else if (paramType.equals(Throwable.class)) {
1586 params[i] = cause;
1587 } else {
1588 return null;
1589 }
1590 }
1591 try {
1592 return constructor.newInstance(params);
1593 } catch (IllegalArgumentException e) {
1594 return null;
1595 } catch (InstantiationException e) {
1596 return null;
1597 } catch (IllegalAccessException e) {
1598 return null;
1599 } catch (InvocationTargetException e) {
1600 return null;
1601 }
1602 }
1603
1604 private interface FutureCombiner<V, C> {
1605 C combine(List<Optional<V>> values);
1606 }
1607
1608 private static class CombinedFuture<V, C> extends AbstractFuture<C> {
1609 private static final Logger logger =
1610 Logger.getLogger(CombinedFuture.class.getName());
1611
1612 ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
1613 final boolean allMustSucceed;
1614 final AtomicInteger remaining;
1615 FutureCombiner<V, C> combiner;
1616 List<Optional<V>> values;
1617 final Object seenExceptionsLock = new Object();
1618 Set<Throwable> seenExceptions;
1619
1620 CombinedFuture(
1621 ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
1622 boolean allMustSucceed, Executor listenerExecutor,
1623 FutureCombiner<V, C> combiner) {
1624 this.futures = futures;
1625 this.allMustSucceed = allMustSucceed;
1626 this.remaining = new AtomicInteger(futures.size());
1627 this.combiner = combiner;
1628 this.values = Lists.newArrayListWithCapacity(futures.size());
1629 init(listenerExecutor);
1630 }
1631
1632 /**
1633 * Must be called at the end of the constructor.
1634 */
1635 protected void init(final Executor listenerExecutor) {
1636 // First, schedule cleanup to execute when the Future is done.
1637 addListener(new Runnable() {
1638 @Override
1639 public void run() {
1640 // Cancel all the component futures.
1641 if (CombinedFuture.this.isCancelled()) {
1642 for (ListenableFuture<?> future : CombinedFuture.this.futures) {
1643 future.cancel(CombinedFuture.this.wasInterrupted());
1644 }
1645 }
1646
1647 // Let go of the memory held by other futures
1648 CombinedFuture.this.futures = null;
1649
1650 // By now the values array has either been set as the Future's value,
1651 // or (in case of failure) is no longer useful.
1652 CombinedFuture.this.values = null;
1653
1654 // The combiner may also hold state, so free that as well
1655 CombinedFuture.this.combiner = null;
1656 }
1657 }, directExecutor());
1658
1659 // Now begin the "real" initialization.
1660
1661 // Corner case: List is empty.
1662 if (futures.isEmpty()) {
1663 set(combiner.combine(ImmutableList.<Optional<V>>of()));
1664 return;
1665 }
1666
1667 // Populate the results list with null initially.
1668 for (int i = 0; i < futures.size(); ++i) {
1669 values.add(null);
1670 }
1671
1672 // Register a listener on each Future in the list to update
1673 // the state of this future.
1674 // Note that if all the futures on the list are done prior to completing
1675 // this loop, the last call to addListener() will callback to
1676 // setOneValue(), transitively call our cleanup listener, and set
1677 // this.futures to null.
1678 // This is not actually a problem, since the foreach only needs
1679 // this.futures to be non-null at the beginning of the loop.
1680 int i = 0;
1681 for (final ListenableFuture<? extends V> listenable : futures) {
1682 final int index = i++;
1683 listenable.addListener(new Runnable() {
1684 @Override
1685 public void run() {
1686 setOneValue(index, listenable);
1687 }
1688 }, listenerExecutor);
1689 }
1690 }
1691
1692 /**
1693 * Fails this future with the given Throwable if {@link #allMustSucceed} is
1694 * true. Also, logs the throwable if it is an {@link Error} or if
1695 * {@link #allMustSucceed} is {@code true}, the throwable did not cause
1696 * this future to fail, and it is the first time we've seen that particular Throwable.
1697 */
1698 private void setExceptionAndMaybeLog(Throwable throwable) {
1699 boolean visibleFromOutputFuture = false;
1700 boolean firstTimeSeeingThisException = true;
1701 if (allMustSucceed) {
1702 // As soon as the first one fails, throw the exception up.
1703 // The result of all other inputs is then ignored.
1704 visibleFromOutputFuture = super.setException(throwable);
1705
1706 synchronized (seenExceptionsLock) {
1707 if (seenExceptions == null) {
1708 seenExceptions = Sets.newHashSet();
1709 }
1710 firstTimeSeeingThisException = seenExceptions.add(throwable);
1711 }
1712 }
1713
1714 if (throwable instanceof Error
1715 || (allMustSucceed && !visibleFromOutputFuture && firstTimeSeeingThisException)) {
1716 logger.log(Level.SEVERE, "input future failed.", throwable);
1717 }
1718 }
1719
1720 /**
1721 * Sets the value at the given index to that of the given future.
1722 */
1723 private void setOneValue(int index, Future<? extends V> future) {
1724 List<Optional<V>> localValues = values;
1725 // TODO(user): This check appears to be redundant since values is
1726 // assigned null only after the future completes. However, values
1727 // is not volatile so it may be possible for us to observe the changes
1728 // to these two values in a different order... which I think is why
1729 // we need to check both. Clear up this craziness either by making
1730 // values volatile or proving that it doesn't need to be for some other
1731 // reason.
1732 if (isDone() || localValues == null) {
1733 // Some other future failed or has been cancelled, causing this one to
1734 // also be cancelled or have an exception set. This should only happen
1735 // if allMustSucceed is true or if the output itself has been
1736 // cancelled.
1737 checkState(allMustSucceed || isCancelled(),
1738 "Future was done before all dependencies completed");
1739 }
1740
1741 try {
1742 checkState(future.isDone(),
1743 "Tried to set value from future which is not done");
1744 V returnValue = getUninterruptibly(future);
1745 if (localValues != null) {
1746 localValues.set(index, Optional.fromNullable(returnValue));
1747 }
1748 } catch (CancellationException e) {
1749 if (allMustSucceed) {
1750 // Set ourselves as cancelled. Let the input futures keep running
1751 // as some of them may be used elsewhere.
1752 cancel(false);
1753 }
1754 } catch (ExecutionException e) {
1755 setExceptionAndMaybeLog(e.getCause());
1756 } catch (Throwable t) {
1757 setExceptionAndMaybeLog(t);
1758 } finally {
1759 int newRemaining = remaining.decrementAndGet();
1760 checkState(newRemaining >= 0, "Less than 0 remaining futures");
1761 if (newRemaining == 0) {
1762 FutureCombiner<V, C> localCombiner = combiner;
1763 if (localCombiner != null && localValues != null) {
1764 set(localCombiner.combine(localValues));
1765 } else {
1766 checkState(isDone());
1767 }
1768 }
1769 }
1770 }
1771 }
1772
1773 /** Used for {@link #allAsList} and {@link #successfulAsList}. */
1774 private static <V> ListenableFuture<List<V>> listFuture(
1775 ImmutableList<ListenableFuture<? extends V>> futures,
1776 boolean allMustSucceed, Executor listenerExecutor) {
1777 return new CombinedFuture<V, List<V>>(
1778 futures, allMustSucceed, listenerExecutor,
1779 new FutureCombiner<V, List<V>>() {
1780 @Override
1781 public List<V> combine(List<Optional<V>> values) {
1782 List<V> result = Lists.newArrayList();
1783 for (Optional<V> element : values) {
1784 result.add(element != null ? element.orNull() : null);
1785 }
1786 return Collections.unmodifiableList(result);
1787 }
1788 });
1789 }
1790
1791 /**
1792 * A checked future that uses a function to map from exceptions to the
1793 * appropriate checked type.
1794 */
1795 private static class MappingCheckedFuture<V, X extends Exception> extends
1796 AbstractCheckedFuture<V, X> {
1797
1798 final Function<? super Exception, X> mapper;
1799
1800 MappingCheckedFuture(ListenableFuture<V> delegate,
1801 Function<? super Exception, X> mapper) {
1802 super(delegate);
1803
1804 this.mapper = checkNotNull(mapper);
1805 }
1806
1807 @Override
1808 protected X mapException(Exception e) {
1809 return mapper.apply(e);
1810 }
1811 }
1812 }